bitkeeper revision 1.1062.3.2 (40f29f095EmGaaKsbz1zoQ1AZH6dQw)
authormjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Mon, 12 Jul 2004 14:24:09 +0000 (14:24 +0000)
committermjw@wray-m-3.hpl.hp.com <mjw@wray-m-3.hpl.hp.com>
Mon, 12 Jul 2004 14:24:09 +0000 (14:24 +0000)
Integrating save/migrate handling with xfrd.
Add suspend callback to save.

tools/libxc/xc_io.h
tools/python/xen/xend/XendClient.py
tools/python/xen/xend/XendDomain.py
tools/python/xen/xend/XendMigrate.py
tools/python/xen/xend/server/SrvDomain.py
tools/python/xen/xend/server/SrvDomainDir.py
tools/xfrd/Makefile
tools/xfrd/xen_domain.c
tools/xfrd/xfrd.c
tools/xfrd/xfrd.h

index 37febb52f18424977d1d8723671851dfae35002a..00ed66dc247a42f11e8c744b0f97405c234be00c 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef __XC_XC_IO_H__
 #define __XC_XC_IO_H__
 
+#include <errno.h>
 #include "xc_private.h"
 #include "iostream.h"
 
@@ -12,8 +13,21 @@ typedef struct XcIOContext {
     IOStream *err;
     char *vmconfig;
     int vmconfig_n;
+    int (*suspend)(u32 domain, void *data);
+    void *data;
 } XcIOContext;
 
+static inline int xcio_suspend_domain(XcIOContext *ctxt){
+    int err = 0;
+
+    if(ctxt->suspend){
+        err = ctxt->suspend(ctxt->domain, ctxt->data);
+    } else {
+        err = -EINVAL;
+    }
+    return err;
+}
+
 static inline int xcio_read(XcIOContext *ctxt, void *buf, int n){
     int rc;
 
index f85c462cf581905f4d31d18d9754b106a92998b5..817d2818cef70a8bc88f055923ba3b0be56a1ced 100644 (file)
@@ -225,6 +225,11 @@ class Xend:
     def xend_domain(self, id):
         return xend_get(self.domainurl(id))
 
+    def xend_domain_configure(self, id, config):
+        return xend_call(self.domainurl(id),
+                         {'op'      : 'configure',
+                          'config'  : fileof(conf) })
+
     def xend_domain_unpause(self, id):
         return xend_call(self.domainurl(id),
                          {'op'      : 'unpause'})
index a106a273a43402d6561705ac3236df90e7b4cb25..7a9ab8d7f3f1f0775f5f039f2cd303382bc9ddfa 100644 (file)
@@ -311,6 +311,42 @@ class XendDomain:
             return dominfo
         deferred.addCallback(fn)
         return deferred
+
+    def domain_configure(self, id, config):
+        """Configure an existing domain. This is intended for internal
+        use by domain restore and migrate.
+
+        @param id:     domain id
+        @param config: configuration
+        @return: deferred
+        """
+        dom = int(id)
+        dominfo = self.domain_get(dom)
+        if not dominfo:
+            raise ValueError("Invalid domain: " + str(id))
+        if dominfo.config:
+            raise ValueError("Domain already configured: " + str(id))
+        def fn(dominfo):
+            self._add_domain(dominfo.id, dominfo)
+            return dominfo
+        deferred = dominfo.construct(config)
+        deferred.addCallback(fn)
+        return deferred
+    
+    def domain_restore(self, src, progress=0):
+        """Restore a domain from file.
+
+        @param src:      source file
+        @param progress: output progress if true
+        @return: deferred
+        """
+        
+        def fn(dominfo):
+            self._add_domain(dominfo.id, dominfo)
+            return dominfo
+        deferred = XendDomainInfo.vm_restore(src, progress=progress)
+        deferred.addCallback(fn)
+        return deferred
     
     def domain_get(self, id):
         """Get up-to-date info about a domain.
@@ -346,6 +382,8 @@ class XendDomain:
          - reboot: domain will restart.
          - halt: domain will not restart (even if has autorestart set).
 
+         Returns immediately.
+
         @param id:     domain id
         @param reason: shutdown type: poweroff, reboot, suspend, halt
         """
@@ -443,7 +481,7 @@ class XendDomain:
 
     def domain_destroy(self, id):
         """Terminate domain immediately.
-        Camcels any restart for the domain.
+        Cancels any restart for the domain.
 
         @param id: domain id
         """
@@ -456,6 +494,7 @@ class XendDomain:
         """Start domain migration.
 
         @param id: domain id
+        @return: deferred
         """
         # Need a cancel too?
         # Don't forget to cancel restart for it.
@@ -464,41 +503,16 @@ class XendDomain:
         return xmigrate.migrate_begin(dom, dst)
 
     def domain_save(self, id, dst, progress=0):
-        """Save domain state to file, destroy domain on success.
-        Leave domain running on error.
+        """Start saving a domain to file.
 
         @param id:       domain id
         @param dst:      destination file
         @param progress: output progress if true
+        @return: deferred
         """
         dom = int(id)
-        dominfo = self.domain_get(id)
-        if not dominfo:
-            return -1
-        vmconfig = sxp.to_string(dominfo.sxpr())
-        self.domain_pause(id)
-        eserver.inject('xend.domain.save', id)
-        try:
-            rc = xc.linux_save(dom=dom, state_file=dst,
-                               vmconfig=vmconfig, progress=progress)
-        except:
-            rc = -1
-        if rc == 0:
-            self.domain_destroy(id)
-        else:
-            self.domain_unpause(id)
-        return rc
-    
-    def domain_restore(self, src, progress=0):
-        """Restore domain from file.
-
-        @param src :     source file
-        @param progress: output progress if true
-        @return: domain object
-        """
-        dominfo = XendDomainInfo.vm_restore(src, progress=progress)
-        self._add_domain(dominfo.id, dominfo)
-        return dominfo
+        xmigrate = XendMigrate.instance()
+        return xmigrate.save_begin(dom, dst)
     
     def domain_pincpu(self, dom, cpu):
         """Pin a domain to a cpu.
index 29cb4bb61674ed3e6aea2b0061a9d75d43dcabae..f938a9afd2ec4e87fc62ace745120c50d33cbc5a 100644 (file)
@@ -1,5 +1,6 @@
 # Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
 
+import errno
 import sys
 import socket
 import time
@@ -14,9 +15,12 @@ import sxp
 import XendDB
 import EventServer; eserver = EventServer.instance()
 
+"""The port for the migrate/save daemon xfrd."""
 XFRD_PORT = 8002
 
+"""The transfer protocol major version number."""
 XFR_PROTO_MAJOR = 1
+"""The transfer protocol minor version number."""
 XFR_PROTO_MINOR = 0
 
 class Xfrd(Protocol):
@@ -55,16 +59,16 @@ class XfrdClientFactory(ClientFactory):
     """Factory for clients of the migration/save daemon xfrd.
     """
 
-    def __init__(self, minfo):
+    def __init__(self, xinfo):
         #ClientFactory.__init__(self)
-        self.minfo = minfo
+        self.xinfo = xinfo
 
     def startedConnecting(self, connector):
         print 'Started to connect', 'self=', self, 'connector=', connector
 
     def buildProtocol(self, addr):
         print 'buildProtocol>', addr
-        return Migrate(self.minfo)
+        return Xfrd(self.xinfo)
 
     def clientConnectionLost(self, connector, reason):
         print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
@@ -76,13 +80,15 @@ class XfrdInfo:
     """Abstract class for info about a session with xfrd.
     Has subclasses for save and migrate.
     """
-    
+
+    def __init__(self):
+        from xen.xend import XendDomain
+        self.xd = XendDomain.instance()
+        self.deferred = defer.Deferred()
+        
     def vmconfig(self):
         print 'vmconfig>'
-        from xen.xend import XendDomain
-        xd = XendDomain.instance()
-
-        dominfo = xd.domain_get(self.src_dom)
+        dominfo = self.xd.domain_get(self.src_dom)
         print 'vmconfig>', type(dominfo), dominfo
         if dominfo:
             val = sxp.to_string(dominfo.sxpr())
@@ -93,6 +99,8 @@ class XfrdInfo:
 
     def error(self, err):
         self.state = 'error'
+        if not self.deferred.called:
+            self.deferred.errback(err)
 
     def dispatch(self, xfrd, val):
         op = sxp.name(val)
@@ -100,14 +108,18 @@ class XfrdInfo:
         if op.startswith('xfr_'):
             fn = getattr(self, op, self.unknown)
         else:
-            fn = self.unknown()
-        fn(xfrd, val)
+            fn = self.unknown
+        val = fn(xfrd, val)
+        if val is not None:
+            sxp.show(val, out=self.transport)
 
     def unknown(self, xfrd, val):
         print 'unknown>', val
+        xfrd.loseConnection()
+        return None
 
     def xfr_err(self, xfrd, val):
-        # If we get an error with non-zero code the migrate failed.
+        # If we get an error with non-zero code the operation failed.
         # An error with code zero indicates hello success.
         print 'xfr_err>', val
         v = sxp.child(val)
@@ -116,21 +128,49 @@ class XfrdInfo:
         if not err: return
         self.error(err);
         xfrd.loseConnection()
+        #try:
+        #    self.xd.domain_unpause(self.src_dom)
+        #except:
+        #    print >>sys.stdout, "Error unpausing domain:", self.src_dom
+        return None
 
     def xfr_progress(self, val):
         print 'xfr_progress>', val
-
-    def xfr_domain_pause(self, val):
-        print 'xfr__domain_pause>', val
-
-    def xfr_domain_suspend(self, val):
-        print 'xfr_domain_suspend>', val
+        return None
+
+    def xfr_vm_pause(self, val):
+        print 'xfr_vm_pause>', val
+        try:
+            vmid = sxp.child0(val)
+            val = self.xd.domain_pause(vmid)
+        except:
+            val = errno.EINVAL
+        return ['xfr.err', val]
+
+    def xfr_vm_unpause(self, val):
+        print 'xfr_vm_unpause>', val
+        try:
+            vmid = sxp.child0(val)
+            val = self.xd.domain_unpause(vmid)
+        except:
+            val = errno.EINVAL
+        return ['xfr.err', val]
+
+    def xfr_vm_suspend(self, val):
+        print 'xfr_vm_suspend>', val
+        try:
+            vmid = sxp.child0(val)
+            val = self.xd.domain_shutdown(vmid, reason='suspend')
+        except:
+            val = errno.EINVAL
+        return ['xfr.err', val]
 
 class XendMigrateInfo(XfrdInfo):
     """Representation of a migrate in-progress and its interaction with xfrd.
     """
 
     def __init__(self, id, dom, host, port):
+        XfrdInfo.__init__(self)
         self.id = id
         self.state = 'begin'
         self.src_host = socket.gethostname()
@@ -139,7 +179,6 @@ class XendMigrateInfo(XfrdInfo):
         self.dst_port = port
         self.dst_dom = None
         self.start = 0
-        self.deferred = defer.Deferred()
         
     def sxpr(self):
         sxpr = ['migrate', ['id', self.id], ['state', self.state] ]
@@ -160,12 +199,15 @@ class XendMigrateInfo(XfrdInfo):
                       self.src_dom,
                       vmconfig,
                       self.dst_host,
-                      self.d.dst_port])
+                      self.dst_port])
         
     def xfr_migrate_ok(self, val):
         dom = int(sxp.child0(val))
         self.state = 'ok'
         self.dst_dom = dom
+        self.xd_domain_destroy(self.src_dom)
+        if not self.deferred.called:
+            self.deferred.callback(self)
 
     def connectionLost(self, reason=None):
         if self.state =='ok':
@@ -179,12 +221,12 @@ class XendSaveInfo(XfrdInfo):
     """
     
     def __init__(self, id, dom, file):
+        XfrdInfo.__init__(self)
         self.id = id
         self.state = 'begin'
         self.src_dom = dom
         self.file = file
         self.start = 0
-        self.deferred = defer.Deferred()
         
     def sxpr(self):
         sxpr = ['save',
@@ -204,6 +246,9 @@ class XendSaveInfo(XfrdInfo):
     def xfr_save_ok(self, val):
         dom = int(sxp.child0(val))
         self.state = 'ok'
+        self.xd_domain_destroy(self.src_dom)
+        if not self.deferred.called:
+            self.deferred.callback(self)
 
     def connectionLost(self, reason=None):
         if self.state =='ok':
@@ -217,7 +262,6 @@ class XendMigrate:
     """External api for interaction with xfrd for migrate and save.
     Singleton.
     """
-    # Represents migration in progress.
     # Use log for indications of begin/end/errors?
     # Need logging of: domain create/halt, migrate begin/end/fail
     # Log via event server?
@@ -226,8 +270,8 @@ class XendMigrate:
     
     def __init__(self):
         self.db = XendDB.XendDB(self.dbpath)
-        self.migrate = {}
-        self.migrate_db = self.db.fetchall("")
+        self.session = {}
+        self.session_db = self.db.fetchall("")
         self.id = 0
 
     def nextid(self):
@@ -235,44 +279,67 @@ class XendMigrate:
         return "%d" % self.id
 
     def sync(self):
-        self.db.saveall("", self.migrate_db)
+        self.db.saveall("", self.session_db)
 
-    def sync_migrate(self, id):
-        self.db.save(id, self.migrate_db[id])
+    def sync_session(self, id):
+        self.db.save(id, self.session_db[id])
 
     def close(self):
         pass
 
-    def _add_migrate(self, id, info):
-        self.migrate[id] = info
-        self.migrate_db[id] = info.sxpr()
-        self.sync_migrate(id)
+    def _add_session(self, id, info):
+        self.session[id] = info
+        self.session_db[id] = info.sxpr()
+        self.sync_session(id)
         #eserver.inject('xend.migrate.begin', info.sxpr())
 
-    def _delete_migrate(self, id):
+    def _delete_session(self, id):
         #eserver.inject('xend.migrate.end', id)
-        del self.migrate[id]
-        del self.migrate_db[id]
+        del self.session[id]
+        del self.session_db[id]
         self.db.delete(id)
 
-    def migrate_ls(self):
-        return self.migrate.keys()
+    def session_ls(self):
+        return self.session.keys()
 
-    def migrates(self):
-        return self.migrate.values()
+    def sessions(self):
+        return self.session.values()
 
-    def migrate_get(self, id):
-        return self.migrate.get(id)
+    def session_get(self, id):
+        return self.session.get(id)
+
+    def session_begin(self, info):
+        self._add_session(id, info)
+        mcf = XfrdClientFactory(info)
+        reactor.connectTCP('localhost', XFRD_PORT, mcf)
+        return info
     
     def migrate_begin(self, dom, host, port=XFRD_PORT):
+        """Begin to migrate a domain to another host.
+
+        @param dom:  domain
+        @param host: destination host
+        @param port: destination port
+        @return: deferred
+        """
         # Check dom for existence, not migrating already.
         # Subscribe to migrate notifications (for updating).
         id = self.nextid()
         info = XendMigrateInfo(id, dom, host, port)
-        self._add_migrate(id, info)
-        mcf = XfrdClientFactory(info)
-        reactor.connectTCP('localhost', XFRD_PORT, mcf)
-        return info
+        self.session_begin(info)
+        return info.deferred
+
+    def save_begin(self, dom, file):
+        """Begin saving a domain to file.
+
+        @param dom:  domain
+        @param file: destination file
+        @return: deferred
+        """
+        id = self.nextid()
+        info = XendSaveInfo(id, dom, file)
+        self.session_begin(info)
+        return info.deferred
 
 def instance():
     global inst
index b71dd63161341a7ea608c509a7942b0f10edc220..cb70cf41bc62672be6b30cb4087d95177a7796ba 100644 (file)
@@ -18,6 +18,14 @@ class SrvDomain(SrvDir):
         self.xd = XendDomain.instance()
         self.xconsole = XendConsole.instance()
 
+    def op_configure(self, op, req):
+        fn = FormFn(self.xd.domain_configure,
+                    [['dom', 'int'],
+                     ['config', 'sxp']])
+        val = fn(req.args, {'dom': self.dom.id})
+        #todo: may need to add ok and err callbacks.
+        return val
+
     def op_unpause(self, op, req):
         val = self.xd.domain_unpause(self.dom.id)
         return val
@@ -27,7 +35,6 @@ class SrvDomain(SrvDir):
         return val
 
     def op_shutdown(self, op, req):
-        #val = self.xd.domain_shutdown(self.dom.id)
         fn = FormFn(self.xd.domain_shutdown,
                     [['dom', 'int'],
                      ['reason', 'str']])
@@ -202,8 +209,22 @@ class SrvDomain(SrvDir):
         req.write('<form method="post" action="%s">' % req.prePathURL())
         req.write('<input type="submit" name="op" value="unpause">')
         req.write('<input type="submit" name="op" value="pause">')
-        req.write('<input type="submit" name="op" value="shutdown">')
         req.write('<input type="submit" name="op" value="destroy">')
+        req.write('</form>')
+
+        req.write('<form method="post" action="%s">' % req.prePathURL())
+        req.write('<input type="submit" name="op" value="shutdown">')
+        req.write('<input type="radio" name="reason" value="poweroff" checked>Poweroff<br>')
+        req.write('<input type="radio" name="reason" value="halt">Halt<br>')
+        req.write('<input type="radio" name="reason" value="reboot">Reboot<br>')
+        req.write('</form>')
+        
+        req.write('<form method="post" action="%s">' % req.prePathURL())
+        req.write('<br><input type="submit" name="op" value="save">')
+        req.write('To file: <input type="text" name="file">')
+        req.write('</form>')
+        
+        req.write('<form method="post" action="%s">' % req.prePathURL())
         req.write('<br><input type="submit" name="op" value="migrate">')
-        req.write('To: <input type="text" name="destination">')
+        req.write('To host: <input type="text" name="destination">')
         req.write('</form>')
index 719ea612ff4c302fb5c883d5a010b5c8f02b5b25..8ca623985002ffb1016bdfef0d933848c88bb815 100644 (file)
@@ -105,7 +105,10 @@ class SrvDomainDir(SrvDir):
 
     def op_restore(self, op, req):
         """Restore a domain from file.
+
+        @return: deferred
         """
+        #todo: return is deferred. May need ok and err callbacks.
         fn = FormFn(self.xd.domain_restore,
                     [['file', 'str']])
         val = fn(req.args)
@@ -154,9 +157,10 @@ class SrvDomainDir(SrvDir):
         req.write('<button type="submit" name="op" value="create">Create Domain</button>')
         req.write('Config <input type="file" name="config"><br>')
         req.write('</form>')
+        
         req.write('<form method="post" action="%s" enctype="multipart/form-data">'
                   % req.prePathURL())
-        req.write('<button type="submit" name="op" value="create">Restore Domain</button>')
+        req.write('<button type="submit" name="op" value="restore">Restore Domain</button>')
         req.write('State <input type="string" name="state"><br>')
         req.write('</form>')
         
index fe8412047193d3800e300f9c044305ae2105dce3..6e9b4b668186d471dc47bc4b815c18df2e5ebea4 100644 (file)
@@ -18,6 +18,9 @@ INCLUDES += -I $(XEN_LINUX_INCLUDE)
 vpath %.h      $(XEN_XU)
 INCLUDES += -I $(XEN_XU)
 
+vpath %.h      $(XEN_LIBXC)
+INCLUDES += -I $(XEN_LIBXC)
+
 vpath %c       $(XEN_LIBXUTIL)
 INCLUDES += -I $(XEN_LIBXUTIL)
 
@@ -28,7 +31,9 @@ UTIL_LIB_OBJ = $(UTIL_LIB_SRC:.c=.o)
 XFRD_PROG_OBJ = $(XFRD_PROG_SRC:.c=.o)
 XFRD_PROG_OBJ += $(UTIL_LIB)
 
-CPPFLAGS += -D _XEN_XFR_STUB_
+# Flag controlling whether to use stubs.
+# Define to use stubs, undefine to use the real Xen functions.
+#CPPFLAGS += -D _XEN_XFR_STUB_
 
 CFLAGS += -g
 CFLAGS += -Wall
@@ -38,15 +43,31 @@ CFLAGS += $(INCLUDES)
 CFLAGS += -Wp,-MD,.$(@F).d
 PROG_DEP = .*.d
 
-#LDFLAGS += -L $(COMPRESS_DIR) -lz
+#$(warning XFRD_PROG_OBJ= $(XFRD_PROG_OBJ))
+#$(warning UTIL_LIB= $(UTIL_LIB))
+#$(warning UTIL_LIB_OBJ= $(UTIL_LIB_OBJ))
+
+# Libraries for xfrd.
+XFRD_LIBS :=
+
+XFRD_LIBS += -L $(XEN_LIBXC) -lxc
+XFRD_LIBS += -L $(XEN_LIBXUTIL) -lxutil
+
+# zlib library.
+XFRD_LIBS += -lz
+
+CURL_FLAGS = $(shell curl-config --cflags)
+CURL_LIBS  = $(shell curl-config --libs)
+CFLAGS     += $(CURL_FLAGS)
+# libcurl libraries.
+XFRD_LIBS += $(CURL_LIBS)
 
-$(warning XFRD_PROG_OBJ= $(XFRD_PROG_OBJ))
-$(warning UTIL_LIB= $(UTIL_LIB))
-$(warning UTIL_LIB_OBJ= $(UTIL_LIB_OBJ))
+#$(warning XFRD_LIBS = $(XFRD_LIBS))
 
 all: xfrd
 
-xfrd: $(XFRD_PROG_OBJ) -lz
+xfrd: $(XFRD_PROG_OBJ)
+       $(CC) -o $@ $^ $(XFRD_LIBS)
 
 .PHONY: install
 install: xfrd
@@ -64,5 +85,6 @@ clean:
        $(RM) *.o *.a *.so *~ xfrd
        $(RM) $(PROG_DEP)
 
+$(XFRD_PROG_OBJ): Makefile
 -include $(PROG_DEP)
 
index 4c197e8396ee37b19d1933c05d6b13dbbc6ef7e9..9fe5def1bafd891d77ab0929f42b21b34ac7d8fd 100644 (file)
@@ -3,18 +3,43 @@
 #include <stdio.h>
 
 #ifndef _XEN_XFR_STUB_
-#include "dom0_defs.h"
-#include "mem_defs.h"
+#include "xc.h"
+#include "xc_io.h"
 #endif
 
 #include "xen_domain.h"
 #include "marshal.h"
 #include "xdr.h"
+#include "xfrd.h"
 
 #define MODULE_NAME "XFRD"
 #define DEBUG 1
 #include "debug.h"
 
+#ifndef _XEN_XFR_STUB_
+static int domain_suspend(u32 dom, void *data){
+    Conn *xend = data;
+
+    return xfr_vm_suspend(xend, dom);
+}
+
+static int xc_handle = 0;
+
+int xcinit(void){
+    if(xc_handle <= 0){
+        xc_handle = xc_interface_open();
+    }
+    return xc_handle;
+}
+
+void xcfini(void){
+    if(xc_handle > 0){
+        xc_interface_close(xc_handle);
+        xc_handle = 0;
+    }
+}
+#endif   
+
 /** Write domain state.
  *
  * At some point during this the domain is suspended, and then there's no way back.
  */
 int xen_domain_snd(Conn *xend, IOStream *io, uint32_t dom, char *vmconfig, int vmconfig_n){
     int err = 0;
+#ifdef _XEN_XFR_STUB_
     char buf[1024];
     int n, k, d, buf_n;
     dprintf("> dom=%d\n", dom);
-#ifdef _XEN_XFR_STUB_
     err = marshal_uint32(io, dom);
     if(err) goto exit;
     err = marshal_string(io, vmconfig, vmconfig_n);
@@ -43,6 +68,15 @@ int xen_domain_snd(Conn *xend, IOStream *io, uint32_t dom, char *vmconfig, int v
     
   exit:
 #else 
+    XcIOContext _ioctxt = {}, *ioctxt = &_ioctxt;
+    dprintf("> dom=%d\n", dom);
+    ioctxt->io = io;
+    ioctxt->info = iostdout;
+    ioctxt->err = iostderr;
+    ioctxt->data = xend;
+    ioctxt->suspend = domain_suspend;
+
+    err = xc_linux_save(xcinit(), ioctxt);
 #endif   
     dprintf("< err=%d\n", err);
     return err;
@@ -53,10 +87,10 @@ int xen_domain_snd(Conn *xend, IOStream *io, uint32_t dom, char *vmconfig, int v
  */
 int xen_domain_rcv(IOStream *io, uint32_t *dom, char **vmconfig, int *vmconfig_n){
     int err = 0;
+#ifdef _XEN_XFR_STUB_
     char buf[1024];
     int n, k, d, buf_n;
     dprintf(">\n");
-#ifdef _XEN_XFR_STUB_
     err = unmarshal_uint32(io, dom);
     if(err) goto exit;
     err = unmarshal_new_string(io, vmconfig, vmconfig_n);
@@ -72,19 +106,104 @@ int xen_domain_rcv(IOStream *io, uint32_t *dom, char **vmconfig, int *vmconfig_n
     }
   exit:
 #else    
+    XcIOContext _ioctxt = {}, *ioctxt = &_ioctxt;
+    dprintf(">\n");
+    ioctxt->io = io;
+    ioctxt->info = iostdout;
+    ioctxt->err = iostderr;
+
+    err = xc_linux_restore(xcinit(), ioctxt);
 #endif   
     dprintf("< err=%d\n", err);
     return err;
 }
 
-/** Configure a new domain. Talk to xend. Use libcurl?
+#include <curl/curl.h>
+
+static int do_curl_global_init = 1;
+
+static CURL *curlinit(void){
+    if(do_curl_global_init){
+        do_curl_global_init = 0;
+        curl_global_init(CURL_GLOBAL_ALL);
+    }
+    return curl_easy_init();
+}
+
+/** Configure a new domain. Talk to xend using libcurl.
  */
 int xen_domain_configure(uint32_t dom, char *vmconfig, int vmconfig_n){
     int err = 0;
-    dprintf(">\n");
+    CURL *curl = NULL;
+    CURLcode curlcode = 0;
+    char domainurl[128] = {};
+    int domainurl_n = sizeof(domainurl) - 1;
+    int n;
+    struct curl_httppost *form = NULL, *last = NULL;
+    CURLFORMcode formcode = 0;
+
+    dprintf("> dom=%u\n", dom);
+    curl = curlinit();
+    if(!curl){
+        eprintf("> Could not init libcurl\n");
+        err = -ENOMEM;
+        goto exit;
+    }
+    n = snprintf(domainurl, domainurl_n,
+                 "http://localhost:%d/xend/domain/%u", XEND_PORT, dom);
+    if(n <= 0 || n >= domainurl_n){
+        err = -ENOMEM;
+        eprintf("Out of memory in url.\n");
+        goto exit;
+    }
+    // Config field - set from vmconfig.
+    formcode = curl_formadd(&form, &last,
+                            CURLFORM_COPYNAME,     "config",
+                            CURLFORM_BUFFER,       "config",
+                            CURLFORM_BUFFERPTR,    vmconfig,
+                            CURLFORM_BUFFERLENGTH, vmconfig_n,
+                            CURLFORM_CONTENTTYPE,  "application/octet-stream",
+                            CURLFORM_END);
+    if(formcode){
+        eprintf("> Error adding config field.\n");
+        goto exit;
+    }
+    // Op field.
+    formcode = curl_formadd(&form, &last,
+                            CURLFORM_COPYNAME,     "op",
+                            CURLFORM_COPYCONTENTS, "configure",
+                            CURLFORM_END);
+
+    if(formcode){
+        eprintf("> Error adding op field.\n");
+        goto exit;
+    }
+    // No progress meter.
+    //curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
+    // Completely quiet.
+    //curl_easy_setopt(curl, CURLOPT_MUTE, 1);
+    // Set the URL.
+    curl_easy_setopt(curl, CURLOPT_URL, domainurl);
+    // POST the form.
+    curl_easy_setopt(curl, CURLOPT_HTTPPOST, form);
+    dprintf("> curl perform...\n");
 #ifdef _XEN_XFR_STUB_
-#else    
-#endif   
+    dprintf("> _XEN_XFR_STUB_ defined - not calling xend\n");
+    curlcode = 0;
+#else
+    curlcode = curl_easy_perform(curl);
+#endif
+  exit:
+    if(curl) curl_easy_cleanup(curl);
+    if(form) curl_formfree(form);
+    if(formcode){
+        dprintf("> formcode=%d\n", formcode);
+        err = -EINVAL;
+    }
+    if(curlcode){
+        dprintf("> curlcode=%d\n", curlcode);
+        err = -EINVAL;
+    }
     dprintf("< err=%d\n", err);
     return err;
 }
index 5dac7be0ba06fbd2d15163987ecf285570a4f816..26ef7819329727c79f7e7dbe7aa7fd9108cfa254 100644 (file)
@@ -93,22 +93,26 @@ Sxpr oxfr_configure; // (xfr.configure <vmid> <vmconfig>)
 Sxpr oxfr_err;       // (xfr.err <code>)
 Sxpr oxfr_hello;     // (xfr.hello <major> <minor>)
 Sxpr oxfr_migrate;   // (xfr.migrate <vmid> <vmconfig> <host> <port>)
-Sxpr oxfr_ok;        // (xfr.ok <value>)
+Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
 Sxpr oxfr_progress;  // (xfr.progress <percent> <rate: kb/s>)
 Sxpr oxfr_save;      // (xfr.save <vmid> <vmconfig> <file>)
-Sxpr oxfr_suspend;   // (xfr.suspend <vmid>)
+Sxpr oxfr_save_ok;   // (xfr.save.ok)
+Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
 Sxpr oxfr_xfr;       // (xfr.xfr <vmid>)
+Sxpr oxfr_xfr_ok;    // (xfr.xfr.ok <vmid>)
 
 void xfr_init(void){
-    oxfr_configure = intern("xfr.configure");
-    oxfr_err       = intern("xfr.err");
-    oxfr_hello     = intern("xfr.hello");
-    oxfr_migrate   = intern("xfr.migrate");
-    oxfr_ok        = intern("xfr.ok");
-    oxfr_progress  = intern("xfr.progress");
-    oxfr_save      = intern("xfr.save");
-    oxfr_suspend   = intern("xfr.suspend");
-    oxfr_xfr       = intern("xfr.xfr");
+    oxfr_configure      = intern("xfr.configure");
+    oxfr_err            = intern("xfr.err");
+    oxfr_hello          = intern("xfr.hello");
+    oxfr_migrate        = intern("xfr.migrate");
+    oxfr_migrate_ok     = intern("xfr.migrate.ok");
+    oxfr_progress       = intern("xfr.progress");
+    oxfr_save           = intern("xfr.save");
+    oxfr_save_ok        = intern("xfr.save.ok");
+    oxfr_vm_suspend     = intern("xfr.vm.suspend");
+    oxfr_xfr            = intern("xfr.xfr");
+    oxfr_xfr_ok         = intern("xfr.xfr.ok");
 }
 
 #ifndef TRUE
@@ -518,11 +522,27 @@ int xfr_send_xfr(Conn *conn, uint32_t vmid){
     return (err < 0 ? err : 0);
 }
 
-int xfr_send_ok(Conn *conn, uint32_t vmid){
+int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
     int err = 0;
 
     err = IOStream_print(conn->out, "(%s %d)",
-                         atom_name(oxfr_ok), vmid);
+                         atom_name(oxfr_xfr_ok), vmid);
+    return (err < 0 ? err : 0);
+}
+
+int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
+    int err = 0;
+
+    err = IOStream_print(conn->out, "(%s %d)",
+                         atom_name(oxfr_migrate_ok), vmid);
+    return (err < 0 ? err : 0);
+}
+
+int xfr_send_save_ok(Conn *conn){
+    int err = 0;
+
+    err = IOStream_print(conn->out, "(%s)",
+                         atom_name(oxfr_save_ok));
     return (err < 0 ? err : 0);
 }
 
@@ -530,10 +550,22 @@ int xfr_send_suspend(Conn *conn, uint32_t vmid){
     int err = 0;
 
     err = IOStream_print(conn->out, "(%s %d)",
-                         atom_name(oxfr_suspend), vmid);
+                         atom_name(oxfr_vm_suspend), vmid);
     return (err < 0 ? err : 0);
 }
 
+/** Suspend a vm on behalf of save/migrate.
+ */
+int xfr_vm_suspend(Conn *xend, uint32_t vmid){
+    int err = 0;
+    err = xfr_send_suspend(xend, vmid);
+    if(err) goto exit;
+    IOStream_flush(xend->out);
+    err = xfr_response(xend);
+  exit:
+    return err;
+}
+
 /** Get vm state. Send transfer message.
  *
  * @param peer connection
@@ -561,7 +593,7 @@ int xfr_send_state(XfrState *state, Conn *xend, Conn *peer){
         int errcode;
         err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
         if(!err) err = errcode;
-    } else if(sxpr_elementp(sxpr, oxfr_ok)){
+    } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
         // Ok - get the new domain id.
         err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
         xfr_error(peer, err);
@@ -592,7 +624,7 @@ int xfr_send_done(XfrState *state, Conn *xend){
         err = xfr_error(xend, first_err);
     } else {
         // Report new domain id to xend.
-        err = xfr_send_ok(xend, state->vmid_new);
+        err = xfr_send_migrate_ok(xend, state->vmid_new);
     }  
 
     XfrState_set_err(state, err);
@@ -683,6 +715,11 @@ int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
         goto exit;
     }
     err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n);
+    if(err){
+        err = xfr_error(xend, err);
+    } else {
+        err = xfr_send_save_ok(xend);
+    }
   exit:
     if(io){
         IOStream_close(io);
@@ -709,7 +746,7 @@ int xfr_recv(Args *args, XfrState *state, Conn *peer){
     if(err) goto exit;
 
     // Report new domain id to peer.
-    err = xfr_send_ok(peer, state->vmid_new);
+    err = xfr_send_xfr_ok(peer, state->vmid_new);
     if(err) goto exit;
     // Get the final ok.
     err = xfr_response(peer);
index 2537fd67ab8fc3088529bc32c32fbe7fa649d0a9..9de464549b3aefa3d814e04f1f5f1489c242e1c5 100644 (file)
@@ -2,7 +2,7 @@
 #define _XFRD_XFRD_H_
 
 /** Xend port in host order. */
-#define XEND_PORT 8001
+#define XEND_PORT 8000
 
 /** Xfrd port in host order. */
 #define XFRD_PORT 8002
@@ -11,4 +11,6 @@
 #define XFR_PROTO_MAJOR   1
 #define XFR_PROTO_MINOR   0
 
+struct Conn;
+extern int xfr_vm_suspend(struct Conn *xend, uint32_t vmid);
 #endif